package com.njcb.ams.store.concurrent.queue;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 阻塞式键值队列
 * 
 * @author liuyanlong 20170510
 *
 * @param <E>
 */
public class MappingBlockingQueue<E> implements java.io.Serializable {

	private static final long serialVersionUID = -817911632652898426L;

	final ConcurrentMap<String, E> items;
	int count = 0;
	int queueMax = Integer.MAX_VALUE;
	final ReentrantLock lock;
	private final Condition notFull;
	private final Condition isChange;

	public MappingBlockingQueue() {
		this(Integer.MAX_VALUE);
	}

	/**
	 * 构造器功能描述：
	 * 
	 * @param capacity 容量
	 */
	public MappingBlockingQueue(int capacity) {
		if (capacity <= 0) {
			throw new IllegalArgumentException();
		}
		queueMax = capacity;
		this.items = new ConcurrentHashMap<String, E>(capacity);
		lock = new ReentrantLock();
		notFull = lock.newCondition();
		isChange = lock.newCondition();
	}

	private static void checkNotNull(Object v) {
		if (v == null) {
			throw new NullPointerException();
		}
	}

	private void enqueue(String messageId, E x) {
		final ConcurrentMap<String, E> items = this.items;
		items.put(messageId, x);
		count++;
		isChange.signal();
	}

	private E dequeue(String messageId) {
		final ConcurrentMap<String, E> items = this.items;
		E x = (E) items.remove(messageId);
		if (null != x) {
			count--;
		}
		return x;
	}

	// 添加元素，队列满则抛出异常
	public boolean add(String messageId, E e) {
		if (offer(messageId, e)) {
			return true;
		} else {
			throw new IllegalStateException("Queue full,size:" + items.size());
		}
	}

	// 添加元素，队列满则返回false
	public boolean offer(String messageId, E e) {
		checkNotNull(e);
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			if (count == queueMax) {
				return false;
			} else {
				enqueue(messageId, e);
				return true;
			}
		} finally {
			lock.unlock();
		}
	}

	// 添加元素，队列满则阻塞一定时间，再满则返回false
	public boolean offer(String messageId, E e, long timeout, TimeUnit unit) throws InterruptedException {
		checkNotNull(e);
		long nanos = unit.toNanos(timeout);
		final ReentrantLock lock = this.lock;
		lock.lockInterruptibly();
		try {
			while (count == queueMax) {
				if (nanos <= 0) {
					return false;
				}
				nanos = notFull.awaitNanos(nanos);
			}
			enqueue(messageId, e);
			return true;
		} finally {
			lock.unlock();
		}
	}

	// 自旋信号量通知
	public void signalChange() {
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			if (items.size() > 0) {
				isChange.signal();
			}
		} finally {
			lock.unlock();
		}
	}

	// 添加元素，队列满则阻塞
	public void put(String messageId, E e) throws InterruptedException {
		checkNotNull(e);
		final ReentrantLock lock = this.lock;
		lock.lockInterruptibly();
		try {
			while (count == items.size()) {
				notFull.await();
			}
			enqueue(messageId, e);
		} finally {
			lock.unlock();
		}
	}

	// 读取元素，如果为null则返回null
	public E poll(String messageId) {
		E ret = dequeue(messageId);
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			if (count == 0) {
				return null;
			} else {
				ret = dequeue(messageId);
				if (null != ret) {
					notFull.signal();
					return ret;
				}
			}
		} finally {
			lock.unlock();
		}
		return null;
	}

	// 读取元素，如果为null,队列满则阻塞一定时间，如果为null则返回null
	public E poll(String messageId, long timeout, TimeUnit unit) throws InterruptedException {
		E ret = dequeue(messageId);
		long nanos = unit.toNanos(timeout);
		final ReentrantLock lock = this.lock;
		lock.lockInterruptibly();
		try {
			while (ret == null && nanos >= 0) {
				if (count == 0) {
					nanos = isChange.awaitNanos(nanos);
				} else {
					ret = dequeue(messageId);
					if (null != ret) {
						notFull.signal();
					} else {
						nanos = isChange.awaitNanos(nanos);
					}
				}
			}
			return ret;
		} finally {
			lock.unlock();
		}
	}

	public int size() {
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			return count;
		} finally {
			lock.unlock();
		}
	}

	@Override
	public String toString() {
		return items.toString();
	}

	public boolean remove(String messageId) {
		if (messageId == null) {
			return false;
		}
		final ConcurrentMap<String, E> items = this.items;
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			if (count > 0) {
				items.remove(messageId);
				return true;
			}
			return false;
		} finally {
			lock.unlock();
		}
	}

	public boolean contains(String messageId) {
		if (messageId == null) {
			return false;
		}
		final ConcurrentMap<String, E> items = this.items;
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			if (count > 0) {
				items.containsKey(messageId);
				return true;
			}
			return false;
		} finally {
			lock.unlock();
		}
	}

	public void clear() {
		final ConcurrentMap<String, E> items = this.items;
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			int k = count;
			if (k > 0) {
				items.clear();
				notFull.signal();
			}
		} finally {
			lock.unlock();
		}
	}

	public ConcurrentMap<String, E> getItems() {
		return items;
	}

}
